package com.atguigu.chapter05.sink;

import com.alibaba.fastjson.JSON;
import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;

import java.util.ArrayList;

/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2021/6/10 10:09
 */
public class Flink02_Sink_Es_Unbounded {
    public static void main(String[] args) throws Exception {
       
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        
        ArrayList<HttpHost> hosts = new ArrayList<>();
        hosts.add(new HttpHost("hadoop162", 9200));
        hosts.add(new HttpHost("hadoop163", 9200));
        hosts.add(new HttpHost("hadoop164", 9200));
    
        ElasticsearchSink.Builder<WaterSensor> esBuilder = new ElasticsearchSink.Builder<>(
            hosts,
            new ElasticsearchSinkFunction<WaterSensor>() {
                @Override
                public void process(WaterSensor waterSensor,
                                    RuntimeContext runtimeContext,
                                    RequestIndexer requestIndexer) {
                    IndexRequest index = Requests
                        .indexRequest()
                        .index("sensor")
                        .type("_doc")  // type不能下划线开头, _doc例外
                        .id(waterSensor.getId())
                        .source(JSON.toJSONString(waterSensor), XContentType.JSON);
                
                    requestIndexer.add(index);
                }
            }
        );
        
        esBuilder.setBulkFlushMaxActions(1);
    
        env
            .socketTextStream("hadoop162", 9999)
            .map(new MapFunction<String, WaterSensor>() {
                @Override
                public WaterSensor map(String line) throws Exception {
                    String[] data = line.split(",");
                    return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.valueOf(data[2]));
                }
            })
            .addSink(esBuilder.build());
        
        env.execute();
        
    }
}
